In [55]:
! rm -f ratings.dat
! wget https://raw.githubusercontent.com/snowch/movie-recommender-demo/master/web_app/data/ratings.dat
Let's check the structure of the data ...
In [56]:
! head -3 ratings.dat
! echo
! tail -3 ratings.dat
Ok, so we should have:
UserID::MovieID::Rating::Timestamp
Now load the data into a RDD:
In [57]:
from pyspark.mllib.recommendation import Rating
ratingsRDD = sc.textFile('ratings.dat') \
.map(lambda l: l.split("::")) \
.map(lambda p: Rating(
user = int(p[0]),
product = int(p[1]),
rating = float(p[2]),
)).cache()
It's useful to check some highlevel statistics on the data. For example, we can see that ratings are between 1.0 and 5.0
In [58]:
ratingsRDD.toDF().describe().show()
First we we zoom in to visualize the ratings for a subset of users (user id < 10 and movie id < 20). We should see a sparse matrix.
After that, we can zoom back out to look at the ratings for all of the users across all of the movies.
Let's take a subset of the data
In [59]:
ratings = ratingsRDD.filter(lambda x: x.user < 20 and x.product < 20).toDF()
Separate the x (user) values and also the y (movie) values for matplotlib.
Also normalise the rating value so that it is between 0 and 1. This is required for coloring the markers.
In [60]:
from pyspark.sql.functions import min, max
user = ratings.map(lambda x: int(x.user)).cache()
movie = ratings.map(lambda x: int(x.product)).cache()
min_r = ratings.select(min('rating')).take(1)[0]['min(rating)']
max_r = ratings.select(max('rating')).take(1)[0]['max(rating)']
def normalise(x):
rating = (x - min_r) / (max_r - min_r)
return float(rating)
ratingN = ratings.map(lambda x: normalise(x.rating)).cache()
We can now plot the sparse matrix of ratings for this subset of users and movies.
In [61]:
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
min_user = ratings.select(min('user')).take(1)[0]['min(user)']
max_user = ratings.select(max('user')).take(1)[0]['max(user)']
min_movie = ratings.select(min('product')).take(1)[0]['min(product)']
max_movie = ratings.select(max('product')).take(1)[0]['max(product)']
width = 5
height = 5
plt.figure(figsize=(width, height))
plt.ylim([min_user-1,max_user+1])
plt.xlim([min_movie-1,max_movie+1])
plt.yticks(np.arange(min_user-1, max_user+1, 1))
plt.xticks(np.arange(min_movie-1, max_movie+1, 1))
plt.xlabel('Movie ID')
plt.ylabel('User ID')
plt.title('Movie Ratings')
ax = plt.gca()
ax.patch.set_facecolor('#898787') # dark grey background
colors = plt.cm.YlOrRd(ratingN.collect())
plt.scatter(
movie.collect(),
user.collect(),
s=50,
marker="s",
color=colors,
edgecolor=colors)
plt.legend(
title='Rating',
loc="upper left",
bbox_to_anchor=(1,1),
handles=[
mpatches.Patch(color=plt.cm.YlOrRd(0), label='1'),
mpatches.Patch(color=plt.cm.YlOrRd(0.25), label='2'),
mpatches.Patch(color=plt.cm.YlOrRd(0.5), label='3'),
mpatches.Patch(color=plt.cm.YlOrRd(0.75), label='4'),
mpatches.Patch(color=plt.cm.YlOrRd(0.99), label='5')
])
plt.show()
In the plot, you can see the ratings color code. For example User 1 has rated movie 1 with the highest rating of 5.
Let's dump the dataset to double check ...
In [62]:
ratings.collect()
Out[62]:
The plot is as expected, so we can repeat this with the full data set.
This time we don't need to filter the RDD
In [63]:
ratings = ratingsRDD.toDF()
Same functions as before ...
In [64]:
from pyspark.sql.functions import min, max
user = ratings.map(lambda x: int(x.user)).cache()
movie = ratings.map(lambda x: int(x.product)).cache()
min_r = ratings.select(min('rating')).take(1)[0]['min(rating)']
max_r = ratings.select(max('rating')).take(1)[0]['max(rating)']
def normalise(x):
rating = (x - min_r) / (max_r - min_r)
return float(rating)
ratingN = ratings.map(lambda x: normalise(x.rating)).cache()
Slightly modified chart, for example to print out smaller markers
In [65]:
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
max_user = ratings.select(max('user')).take(1)[0]['max(user)']
max_movie = ratings.select(max('product')).take(1)[0]['max(product)']
width = 10
height = 10
plt.figure(figsize=(width, height))
plt.ylim([0,max_user])
plt.xlim([0,max_movie])
plt.ylabel('User ID')
plt.xlabel('Movie ID')
plt.title('Movie Ratings')
ax = plt.gca()
ax.patch.set_facecolor('#898787') # dark grey background
colors = plt.cm.YlOrRd(ratingN.collect())
plt.scatter(
movie.collect(),
user.collect(),
s=1,
edgecolor=colors)
plt.legend(
title='Rating',
loc="upper left",
bbox_to_anchor=(1,1),
handles=[
mpatches.Patch(color=plt.cm.YlOrRd(0), label='1'),
mpatches.Patch(color=plt.cm.YlOrRd(0.25), label='2'),
mpatches.Patch(color=plt.cm.YlOrRd(0.5), label='3'),
mpatches.Patch(color=plt.cm.YlOrRd(0.75), label='4'),
mpatches.Patch(color=plt.cm.YlOrRd(0.99), label='5')
])
plt.show()
We can see some clear patterns. The vertical lines could indicate that the movie is rated similarly by all users.
The horizontal lines could also indicate that a person ranks all movies fairly similarly - if a pale line, they tend to rate negatively and dark red positively.
There are some interesting grey patterns too, where users have not rated movies. Notice the grey arc at the top right of the plot.
ALS generates user and product features from the rating data (the matrix on the left). See here for more information on ALS.
The Product Features and User Features are referred to as latent factors because we don't know (or care) what they actually represent - their meaning is hidden (latent) from us. The number of latent features is also called the rank. In the example above, we are assuming that there are five features, so the rank is 5.
It may help you intuitively if you think of the latent features as representing movie attributes such as genre, actors or release date.
The ALS algorithm when generating the Product Features and User Features works approximately like this:
After each iteration the least squares error will be lower than the previous iteration. The optimum number of iterations can be determined by experimentation.
If the ALS algorithm just solved using least squares there is a potential that the generated User and Product features will be overfitted to the data. The Lambda parameter prevents overfitting. The optimum value for the Lambda parameter can be determined by experimentation.
In [66]:
from pyspark.mllib.recommendation import ALS
# rank is the number of latent feature
rank = 5
numIterations = 20
lambdaParam = 0.1
model = ALS.train(ratings, rank, numIterations, lambdaParam)
Let's take a peek at the user features for user IDs 1 and 2
In [67]:
model.userFeatures().sortByKey().take(2)
Out[67]:
Let's take a peek at the product features for movie IDs 1 and 2
In [68]:
model.productFeatures().sortByKey().take(2)
Out[68]:
We can now use linear algebra to predict the rating 'manually' directly from the user and product features for user ID=1 and movie ID=1:
In [69]:
import numpy as np
np.dot(
model.userFeatures().sortByKey().take(1)[0][1],
model.productFeatures().sortByKey().take(1)[0][1]
)
Out[69]:
Spark provides an API so we don't have to perform the manual calculations.
Let's use the spark API to predict a rating:
In [70]:
model.predict(user=1, product=1)
Out[70]:
Spark also has API's to perform bulk calculations such as predicting the top X products for all users. Let's look at the data for two users (.take(2)):
In [71]:
top_ten_for_all_users = model.recommendProductsForUsers(5).toDF().take(2)
Pretty print the recommendations
In [72]:
for user in top_ten_for_all_users:
print('')
for prediction in user['_2']:
print(prediction)
In the production web application, we predict the top 10 products for all users every hour and we populate the recommendations in a Cloudant database where they can be easily accessed by the web application code.
In [ ]: